MySQL Binlog 解析工具 Maxwell

MySQL Binlog 解析工具 Maxwell

常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:

ainyu-dp2b2

maxwell 简介

Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。官网(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)

Maxwell主要提供了下列功能:

  • 支持 SELECT * FROM table 的方式进行全量数据初始化
  • 支持在主库发生failover后,自动恢复binlog位置(GTID)
  • 可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
  • 工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event

canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。

maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端。

mysql启动binlog

1
2
3
4
[mysqld]
server_id=1
log-bin=master
binlog_format=row

mysql创建用户

Maxwell用户,并赋予 maxwell 库的一些权限

1
2
3
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

启动kafka集群

docker 快速安装并使用 Maxwell

1
2
3
4
5
# 拉取镜像 
docker pull zendesk/maxwell

# 启动maxwell,并将解析出的binlog输出到控制台
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout

测试Maxwell表

首先创建一张简单的表,然后增改删数据

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `test` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`age` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into test values(1,22,"小旋锋");
update test set name='whirly' where id=1;
delete from test where id=1;

观察docker控制台的输出,从输出的日志中可以看出Maxwell解析出的binlog的JSON字符串的格式

1
2
3
{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋锋"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋锋"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}

输出到 Kafka,关闭 docker,重新设置启动参数

1
2
3
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug

然后启动一个消费者来消费 maxwell topic的消息,观察其输出;再一次执行增改删数据的SQL,仍然可以得到相同的输出

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell

可以把Maxwell的启动参数写到一个配置文件 config.properties 中,然后通过 config 选项指定,bin/maxwell --config config.properties

1
2
3
4
5
6
user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell

Maxwell 会将消息投递到Kafka的Topic中,该Topic由 kafka_topic 选项指定,默认值为 maxwell,除了指定为静态的Topic,还可以指定为动态的,譬如 namespace_%{database}_%{table}%{database}%{table} 将被具体的消息的 database 和 table 替换。

解析输出JSON字符串

具体见其他文档

参考地址

https://juejin.im/post/5c8616fc5188251bde6bd43a

-------------本文结束-------------
0%